package com.lianda.operator;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * FlatMap，做流的整合一行变零到多行
 */
public class FlatMapMain {
    public static void main(String[] args) throws Exception {
        //监听端口
        String hostname = "127.0.0.1";
        Integer port = Integer.parseInt("9000");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = env.socketTextStream(hostname, port);

        //
        SingleOutputStreamOperator<String> output  = stream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                //分割一行句子
                String[] words = s.split("\\W+");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        });

        output.print();
        env.execute("demo");
    }
}
